package com.superid.clients;

import com.superid.config.KafkaCommonConfig;
import com.superid.config.KafkaConsumerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.avro.generic.GenericData;
import org.apache.kafka.clients.consumer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Properties;

/**
 * @author dufeng
 * @create: 2018-08-08 15:54
 */
@Component
public class AvroKafkaConsumer implements InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(AvroKafkaConsumer.class);
    @Autowired
    private KafkaCommonConfig commonConfig;
    @Autowired
    private KafkaConsumerConfig consumerConfig;

    private Consumer<GenericData.Record, GenericData.Record> consumer;

    @Override
    public void afterPropertiesSet() throws Exception {
        logger.info("kafka config="+commonConfig);
        logger.info("consumerConfig config="+consumerConfig);

        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, commonConfig.getBootstrapServers());
        props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, commonConfig.getSchemaRegistryUrl());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerConfig.getAutoOffsetReset());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerConfig.isEnableAutoCommit());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerConfig.getGroupId());
        consumer = new KafkaConsumer<>(props);

    }


    public void consume(List<String> topics) {
        consumer.subscribe(topics);
        try {
            while (true) {
                ConsumerRecords<GenericData.Record, GenericData.Record> records = consumer.poll(1000);

                for (ConsumerRecord<GenericData.Record, GenericData.Record> record : records) {
                    logger.info("record=" + record);
                    logger.info("topic = {},offset = {}, key = {}, value = {} \n", record.topic(), record.offset(), record.key(), record.value());

                }

                consumer.commitSync();
            }
        } finally {
            consumer.close();
        }

    }
}
